#!/usr/bin/python3
# ******************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2022-2022. All rights reserved.
# licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#     http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# ******************************************************************************/
import json
import os
import unittest
from unittest import mock

from ceres.conf.constant import CommandExitCode
from ceres.function.status import (
    FAIL,
    PARAM_ERROR,
    REPO_CONTENT_INCORRECT,
    REPO_NOT_SET,
    SERVICE_NOT_EXIST,
    SUCCESS,
    COMMAND_EXEC_ERROR,
)
from ceres.manages.vulnerability_manage import VulnerabilityManage
from ceres.function.check import PreCheck


class TestVulnerabilityManage(unittest.TestCase):
    def setUp(self) -> None:
        VulnerabilityManage.installed_rpm_info = {"kernel": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64"}
        VulnerabilityManage.available_rpm_info = {"kernel": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64"}
        VulnerabilityManage.available_hotpatch_key_set = set()

    @mock.patch.object(os, "remove")
    @mock.patch("builtins.open", mock.mock_open())
    @mock.patch.object(VulnerabilityManage, "_validate_repo_source")
    def test_repo_set_should_return_success_when_input_repo_content_can_be_used_by_yum(
        self, mock_validate_source, mock_remove
    ):
        mock_validate_source.return_value = True
        mock_remove.return_value = ''
        mock_args = {
            "repo_info": {
                "repo_name": "mock_name",
                "dest": "/etc/yum.repos.d/mock.repo",
                "repo_content": "mock_content",
            },
            "check_items": [],
            "check": False,
        }
        result = VulnerabilityManage().repo_set(mock_args)
        self.assertEqual(SUCCESS, result)

    @mock.patch.object(os, "remove")
    @mock.patch("builtins.open", mock.mock_open())
    @mock.patch.object(VulnerabilityManage, "_validate_repo_source")
    def test_repo_set_should_return_repo_content_is_incorrect_when_repo_content_cannot_be_used_by_yum(
        self, mock_validate_source, mock_remove
    ):
        mock_validate_source.return_value = False
        mock_remove.return_value = ''
        mock_args = {
            "repo_info": {
                "repo_name": "mock_name",
                "dest": "/etc/yum.repos.d/mock.repo",
                "repo_content": "mock_content",
                "check": False,
            },
            "check_items": [],
        }
        result = VulnerabilityManage().repo_set(mock_args)
        self.assertEqual(REPO_CONTENT_INCORRECT, result)

    def test_repo_set_should_return_param_error_when_repo_save_path_is_incorrect(self):
        mock_args = {
            "repo_info": {
                "repo_name": "mock_name",
                "dest": "mock_dest",
                "repo_content": "mock_content",
                "check": False,
            },
            "check_items": [],
        }
        result = VulnerabilityManage().repo_set(mock_args)
        self.assertEqual(PARAM_ERROR, result)

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_validate_repo_source_should_return_true_when_query_repo_info_succeed(self, mock_execute_shell_command):
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", ""
        result = VulnerabilityManage._validate_repo_source('update')
        self.assertEqual(True, result)

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_validate_repo_source_should_return_false_when_shell_command_execute_failed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual(False, VulnerabilityManage._validate_repo_source('update'))

    @mock.patch.object(VulnerabilityManage, "_query_fixed_cves")
    @mock.patch.object(VulnerabilityManage, "_query_unfixed_cves_by_dnf")
    @mock.patch.object(VulnerabilityManage, "_query_unfixed_cves_by_dnf_plugin")
    @mock.patch.object(PreCheck, "execute_check")
    def test_cve_scan_should_return_cve_info_with_items_check_result_when_all_is_right(
        self, mock_check_items, mock_coldpatch_cve_info, mock_hotpatch_cve_info, mock_fixed_cves
    ):
        mock_check_items.return_value = True, []
        mock_coldpatch_cve_info.return_value = []
        mock_hotpatch_cve_info.return_value = []
        mock_fixed_cves.return_value = []
        mock_args = {"check_items": []}
        expected_res = SUCCESS, {
            "check_items": [],
            "unfixed_cves": [],
            "fixed_cves": [],
        }
        self.assertEqual(expected_res, VulnerabilityManage().cve_scan(mock_args))

    @mock.patch.object(PreCheck, "execute_check")
    def test_cve_scan_should_return_items_check_result_when_precheck_items_failed(self, mock_check_items):
        mock_check_items.return_value = False, []
        mock_args = {"check_items": []}
        expected_res = FAIL, {"check_items": []}
        self.assertEqual(expected_res, VulnerabilityManage().cve_scan(mock_args))

    @mock.patch.object(VulnerabilityManage, "_query_fixed_cves")
    @mock.patch.object(VulnerabilityManage, "_query_unfixed_cves_by_dnf")
    @mock.patch.object(VulnerabilityManage, "_query_unfixed_cves_by_dnf_plugin")
    @mock.patch.object(PreCheck, "execute_check")
    def test_cve_scan_should_return_coldpatch_cve_info_with_items_check_result_when_query_hotpatch_cves_failed(
        self, mock_check_items, mock_coldpatch_cve_info, mock_hotpatch_cve_info, mock_fixed_cves
    ):
        mock_check_items.return_value = True, []
        mock_coldpatch_cve_info.return_value = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2304.1.0.0131.oe1.x86_64",
                "available_rpm": "kernel-4.19.90-2304.1.0.0196.oe1.x86_64",
                "support_way": "coldpatch",
            }
        ]
        mock_hotpatch_cve_info.return_value = []
        mock_fixed_cves.return_value = []
        mock_args = {"check_items": []}
        expected_res = SUCCESS, {
            "check_items": [],
            "unfixed_cves": [
                {
                    "cve_id": "CVE-2023-1513",
                    "installed_rpm": "kernel-4.19.90-2304.1.0.0131.oe1.x86_64",
                    "available_rpm": "kernel-4.19.90-2304.1.0.0196.oe1.x86_64",
                    "support_way": "coldpatch",
                }
            ],
            "fixed_cves": [],
        }
        self.assertEqual(expected_res, VulnerabilityManage().cve_scan(mock_args))

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_installed_packages_should_return_installed_packages_info_dict_when_query_succeed(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = """
        "kernel-tools":"kernel-tools-5.10.0-60.92.0.116.oe2203.aarch64",
        "kernel-headers":"kernel-headers-5.10.0-60.92.0.116.oe2203.aarch64",
        "kernel-devel":"kernel-devel-5.10.0-60.92.0.116.oe2203.aarch64",
        "kernel":"kernel-5.10.0-60.92.0.116.oe2203.aarch64","""
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        expected_result = {
            "kernel-tools": "kernel-tools-5.10.0-60.92.0.116.oe2203.aarch64",
            "kernel-headers": "kernel-headers-5.10.0-60.92.0.116.oe2203.aarch64",
            "kernel-devel": "kernel-devel-5.10.0-60.92.0.116.oe2203.aarch64",
            "kernel": "kernel-5.10.0-60.92.0.116.oe2203.aarch64",
        }
        self.assertEqual(expected_result, VulnerabilityManage._query_installed_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_installed_packages_should_return_empty_dict_when_query_failed(self, mock_execute_shell_command):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", "error"
        self.assertEqual({}, VulnerabilityManage._query_installed_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_installed_packages_should_return_empty_dict_when_query_cmd_output_is_null(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", ""
        self.assertEqual({}, VulnerabilityManage._query_installed_rpm())

    @mock.patch.object(json, "loads")
    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_installed_packages_should_return_empty_dict_when_query_rpm_succeed_but_json_decode_error(
        self, mock_execute_shell_command, mock_json_loads
    ):
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", ""
        mock_json_loads.side_effect = json.decoder.JSONDecodeError("", "", 0)
        self.assertEqual({}, VulnerabilityManage._query_installed_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_available_rpm_should_return_all_available_rpm_info_dict_when_query_cmd_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = """
kernel.x86_64                                      5.10.0-60.105.0.132.oe2203       update
kernel-debuginfo.x86_64                            5.10.0-60.105.0.132.oe2203       update
kernel-debugsource.x86_64                          5.10.0-60.105.0.132.oe2203       update
mock-kernel.x86_64                                 5.10.0-60.105.0.132.oe2203       update
"""
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        expected_result = {
            "kernel": "kernel-5.10.0-60.105.0.132.oe2203.x86_64",
            "kernel-debuginfo": "kernel-debuginfo-5.10.0-60.105.0.132.oe2203.x86_64",
            "kernel-debugsource": "kernel-debugsource-5.10.0-60.105.0.132.oe2203.x86_64",
        }
        self.assertEqual(expected_result, VulnerabilityManage._query_available_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_available_rpm_should_return_empty_dict_when_query_cmd_execute_failed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", "error"
        self.assertEqual({}, VulnerabilityManage._query_available_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_available_rpm_should_return_empty_dict_when_query_cmd_execute_succeed_but_without_kernel_packages(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = """
ziplib-debuginfo.x86_64                             0.13.71-3.oe2203                    debuginfo
zziplib-debugsource.x86_64                          0.13.71-3.oe2203                    debuginfo
zziplib-devel.x86_64                                0.13.71-3.oe2203                    everything
"""
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual({}, VulnerabilityManage._query_available_rpm())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_should_return_unfixed_cve_info_when_command_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Last metadata expiration check: 0:26:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2021-43976  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64\n"
            "CVE-2021-0941   Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        expected_result = [
            {
                "cve_id": "CVE-2021-43976",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64",
                "support_way": "coldpatch",
            },
            {
                "cve_id": "CVE-2021-0941",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64",
                "support_way": "coldpatch",
            },
        ]
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_should_return_empty_list_when_command_execute_fail(self, mock_execute_shell_command):
        mock_cmd_output = (
            "Last metadata expiration check: 0:26:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-0049  Important/Sec. vim-minimal-2:9.0-7.oe2203.x86_64\n"
            "CVE-2023-0051  Important/Sec. vim-minimal-2:9.0-7.oe2203.x86_64\n"
            "CVE-2023-0054  Important/Sec. vim-minimal-2:9.0-7.oe2203.x86_64\n"
            "CVE-2023-0288  Important/Sec. vim-minimal-2:9.0-8.oe2203.x86_64\n"
            "CVE-2022-47024 Important/Sec. vim-minimal-2:9.0-8.oe2203.x86_64\n"
            "CVE-2023-0433  Important/Sec. vim-minimal-2:9.0-9.oe2203.x86_64\n"
            "CVE-2022-4899  Important/Sec. zstd-1.5.0-4.oe2203.x86_64\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", ""
        self.assertEqual([], VulnerabilityManage()._query_unfixed_cves_by_dnf())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_should_return_empty_list_when_command_execute_succeed_but_without_kernel(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual([], VulnerabilityManage()._query_unfixed_cves_by_dnf())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_applied_hotpatch_should_return_relation_info_list_when_command_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Last metadata expiration check: 0:28:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-id        base-pkg/hotpatch                      status\n"
            "CVE-2023-1513 kernel-4.19.90-2112.8.0.0132.oe1/HP001 ACTIVED\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "fix_way": "hotpatch",
                "hp_status": "ACTIVED",
            }
        ]
        self.assertEqual(expected_result, VulnerabilityManage()._query_applied_hotpatch())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_applied_hotpatch_should_return_epmty_list_when_command_execute_fail(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual([], VulnerabilityManage()._query_applied_hotpatch())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_applied_hotpatch_should_return_epmty_list_when_command_execute_succeed_but_without_kernel(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Last metadata expiration check: 1:23:42 ago on Thu 10 Aug 2023 10:57:14 PM CST.\n"
            "CVE-id        base-pkg/hotpatch   status\n"
            "CVE-2023-1111 redis-6.2.5-1/HP001 NOT-APPLIED\n"
            "CVE-2023-1112 redis-6.2.5-1/HP001 NOT-APPLIED\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual([], VulnerabilityManage()._query_applied_hotpatch())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_applied_hotpatch_should_return_epmty_list_when_command_execute_succeed_but_cve_is_not_fixed_completely(
        self, mock_execute_shell_command
    ):
        VulnerabilityManage.available_hotpatch_key_set.add("CVE-2023-1111-redis")
        mock_cmd_output = (
            "Last metadata expiration check: 1:23:42 ago on Thu 10 Aug 2023 10:57:14 PM CST.\n"
            "CVE-id        base-pkg/hotpatch                                                 status\n"
            "CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-benchmark                             ACTIVED\n"
            "CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-cli                                   ACTIVED\n"
            "CVE-2023-1111 redis-6.2.5-1/ACC-1-1/redis-server                                NOT-APPLIED\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual([], VulnerabilityManage()._query_applied_hotpatch())

    @mock.patch.object(VulnerabilityManage, "_query_applied_hotpatch")
    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_fixed_cves_should_return_fixed_cve_info_when_command_execute_succeed(
        self, mock_execute_shell_command, mock_hotpatch_cves_info
    ):
        mock_cmd_output = (
            "Last metadata expiration check: 0:26:36 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2021-43976  Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64\n"
            "CVE-2021-0941   Important/Sec. kernel-4.19.90-2201.1.0.0132.oe1.x86_64\n"
        )
        mock_hotpatch_cves_info.return_value = []
        expected_result = [
            {
                'cve_id': 'CVE-2021-43976',
                'fix_way': 'coldpatch',
                'installed_rpm': 'kernel-4.19.90-2112.8.0.0132.oe2203.x86_64',
            },
            {
                'cve_id': 'CVE-2021-0941',
                'fix_way': 'coldpatch',
                'installed_rpm': 'kernel-4.19.90-2112.8.0.0132.oe2203.x86_64',
            },
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_fixed_cves())

    @mock.patch.object(VulnerabilityManage, "_query_applied_hotpatch")
    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_fixed_cves_should_return_hotpatch_applied_info_when_command_execute_failed(
        self, mock_execute_shell_command, mock_hotpatch_cves_info
    ):
        mock_hotpatch_res = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "fix_way": "hotpatch",
                "hp_status": "ACTIVED",
            }
        ]
        mock_hotpatch_cves_info.return_value = mock_hotpatch_res
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual(mock_hotpatch_res, VulnerabilityManage()._query_fixed_cves())

    @mock.patch.object(VulnerabilityManage, "_query_applied_hotpatch")
    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_fixed_cves_should_return_hotpatch_applied_info_when_command_execute_succeed_but_without_kernel(
        self, mock_execute_shell_command, mock_hotpatch_cves_info
    ):
        mock_cmd_output = (
            "CVE-2022-0216  Moderate/Sec.  qemu-img-4.1.0-73.oe1.x86_64\n"
            "CVE-2023-0664  Important/Sec. qemu-img-4.1.0-78.oe1.x86_64\n"
            "CVE-2023-2861  Important/Sec. qemu-img-4.1.0-78.oe1.x86_64\n"
        )
        mock_hotpatch_res = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "fix_way": "hotpatch",
                "hp_status": "ACTIVED",
            }
        ]
        mock_hotpatch_cves_info.return_value = mock_hotpatch_res
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(mock_hotpatch_res, VulnerabilityManage()._query_fixed_cves())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_unfixed_cves_info_when_command_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -\n"
            "CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -\n"
            "CVE-2023-1513   Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         patch-kernel-4.19.90-2112\n"
        )
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "patch-kernel-4.19.90-2112",
                "support_way": "hotpatch",
            },
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64",
                "support_way": "coldpatch",
            },
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_unfixed_cves_info_when_command_execute_succeed_and_only_has_coldpatch(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -\n"
            "CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -\n"
            "CVE-2023-1513   Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         -\n"
        )
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64",
                "support_way": "coldpatch",
            }
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_unfixed_cves_info_when_command_execute_succeed_and_some_cve_has_many_unavailable_rpms(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         -\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         -\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         -\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         -\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         patch-kernel-4.19.90-2112.../ACC\n"
            "CVE-2023-1513  Important/Sec. kernel-4.19.90-2304.1.0.0196.oe1.x86_64         patch-kernel-4.19.90-2118.../SGL\n"
            "CVE-2023-1514  Important/Sec. -                                               -\n"
            "CVE-2023-1515  Important/Sec. -                                               patch-kernel-4.19.90-2120-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64\n"
        )
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "kernel-4.19.90-2112.8.0.0152.oe2203.x86_64",
                "support_way": "coldpatch",
            },
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "patch-kernel-4.19.90-2112.../ACC",
                "support_way": "hotpatch",
            },
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "patch-kernel-4.19.90-2118.../SGL",
                "support_way": "hotpatch",
            },
            {
                "cve_id": "CVE-2023-1514",
                "installed_rpm": "",
                "available_rpm": "",
                "support_way": "",
            },
            {
                "cve_id": "CVE-2023-1515",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "patch-kernel-4.19.90-2120-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64",
                "support_way": "hotpatch",
            },
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_unfixed_cves_info_when_command_execute_succeed_and_only_has_hotpatch(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -\n"
            "CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -\n"
            "CVE-2023-1513   Important/Sec. -                                               patch-kernel-4.19.90-2120-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64\n"
        )
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "kernel-4.19.90-2112.8.0.0132.oe2203.x86_64",
                "available_rpm": "patch-kernel-4.19.90-2120-SGL_CVE_2023_1111_CVE_2023_1112-1-1.x86_64",
                "support_way": "hotpatch",
            },
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_unfixed_cves_info_when_command_execute_succeed_and_only_has_cve_id(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -\n"
            "CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -\n"
            "CVE-2023-1513   Important/Sec. -                                               -\n"
        )
        expected_result = [
            {
                "cve_id": "CVE-2023-1513",
                "installed_rpm": "",
                "available_rpm": "",
                "support_way": "",
            },
        ]
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual(expected_result, VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_empty_list_when_command_execute_succeed_without_kernel_info(
        self, mock_execute_shell_command
    ):
        mock_cmd_output = (
            "Example of command execution result:\n"
            "Last metadata expiration check: 0:31:50 ago on Mon 07 Aug 2023 10:26:32 AM CST.\n"
            "CVE-2023-1981   Moderate/Sec.  avahi-libs-0.8-9.oe1.x86_64                     -\n"
            "CVE-2021-42574  Important/Sec. binutils-2.34-19.oe1.x86_64                     -\n"
        )
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_cmd_output, ""
        self.assertEqual([], VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_query_unfixed_cves_by_dnf_plugin_should_return_empty_list_when_command_execute_failed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual([], VulnerabilityManage()._query_unfixed_cves_by_dnf_plugin())

    @mock.patch.object(PreCheck, "execute_check")
    def test_cve_fix_should_return_fail_when_pre_check_is_failed(self, mock_pre_check):
        mock_check_items_log = {"item": "network", "result": False, "log": "check failed"}
        mock_pre_check.return_value = False, mock_check_items_log
        mock_args = {
            "check_items": ["network"],
            "accepted": True,
            "takeover": True,
            "cves": [
                {
                    "cve_id": "cve1",
                    "rpms": [
                        {
                            "installed_rpm": "kernel-4.19xxx",
                            "available_rpm": "kernel-4.19xxx-new-release",
                            "fix_way": "coldpatch",
                        },
                    ],
                }
            ],
        }
        expected_result = FAIL, {
            'check_items': {'item': 'network', 'log': 'check failed', 'result': False},
            'cves': [
                {
                    'cve_id': 'cve1',
                    'result': 'failed',
                    'rpms': [
                        {'installed_rpm': 'kernel-4.19xxx', 'log': 'pre-check items check failed', 'result': 'fail'}
                    ],
                }
            ],
        }
        self.assertEqual(expected_result, VulnerabilityManage().cve_fix(mock_args))

    @mock.patch.object(VulnerabilityManage, "_update_rpm_by_dnf")
    @mock.patch.object(PreCheck, "execute_check")
    def test_cve_fix_should_return_cve_fix_result_when_pre_check_is_successful(self, mock_pre_check, mock_upadte_rpm):
        mock_check_items_log = [{"item": "network", "result": True, "log": "check normal"}]
        mock_pre_check.return_value = True, mock_check_items_log
        mock_upadte_rpm.side_effect = [(SUCCESS, "update successful"), (FAIL, "update failed")]
        mock_args = {
            "check_items": ["network"],
            "accepted": True,
            "takeover": True,
            "cves": [
                {
                    "cve_id": "cve1",
                    "rpms": [
                        {
                            "installed_rpm": "kernel-4.19xxx",
                            "available_rpm": "kernel-4.19xxx-new-release",
                            "fix_way": "coldpatch",
                        },
                        {
                            "installed_rpm": "kernel-4.20xxx",
                            "available_rpm": "patch-kernel-4.19xxx-release-ACC",
                            "fix_way": "hotpatch",
                        },
                    ],
                }
            ],
        }
        expected_result = FAIL, {
            "check_items": mock_check_items_log,
            "cves": [
                {
                    "cve_id": "cve1",
                    "result": FAIL,
                    "rpms": [
                        {"installed_rpm": "kernel-4.19xxx", "result": SUCCESS, "log": "update successful"},
                        {"installed_rpm": "kernel-4.20xxx", "result": FAIL, "log": "update failed"},
                    ],
                }
            ],
        }
        self.assertEqual(expected_result, VulnerabilityManage().cve_fix(mock_args))

    @mock.patch.object(VulnerabilityManage, '_update_coldpatch_by_dnf')
    def test_update_rpm_by_dnf_should_return_cve_fix_result_when_fix_way_is_coldpatch(self, mock_update_coldpatch):
        mock_update_coldpatch.return_value = SUCCESS, "update package successful"
        mock_args = {
            "installed_rpm": "kernel-4.19xxx",
            "available_rpm": "kernel-4.19xxx-new-release",
            "fix_way": "coldpatch",
        }
        self.assertEqual(SUCCESS, VulnerabilityManage()._update_rpm_by_dnf(mock_args)[0])

    @mock.patch.object(VulnerabilityManage, '_update_hotpatch_by_dnf_plugin')
    def test_update_rpm_by_dnf_should_return_cve_fix_result_when_fix_way_is_hotpatch(self, mock_update_coldpatch):
        mock_update_coldpatch.return_value = SUCCESS, "update package successful"
        mock_args = {
            "installed_rpm": "kernel-4.19xxx",
            "available_rpm": "kernel-4.19xxx-new-release",
            "fix_way": "hotpatch",
        }
        self.assertEqual(SUCCESS, VulnerabilityManage()._update_rpm_by_dnf(mock_args)[0])

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_coldpatch_by_dnf_should_return_success_when_update_package_succeed(
        self, mock_execute_shell_command
    ):
        mock_stdout = """
        Last metadata expiration check: 2:40:52 ago on Thu 06 Jul 2023 08:07:20 AM CST.
        Dependencies resolved.
        ================================================================================
        Package Architecture Version Repository Size
        ================================================================================
        Upgrading:
        apr aarch64 1.7.0-6.oe2203 aops-update 104 k

        Transaction Summary
        ================================================================================
        Upgrade 1 Package

        Total download size: 104 k
        Downloading Packages:
        apr-1.7.0-6.oe2203.aarch64.rpm 402 kB/s | 104 kB 00:00
        --------------------------------------------------------------------------------
        Total 399 kB/s | 104 kB 00:00
        Running transaction check
        Transaction check succeeded.
        Running transaction test
        Transaction test succeeded.
        Running transaction
        Preparing : 1/1
        Running scriptlet: apr-1.7.0-6.oe2203.aarch64 1/2
        Upgrading : apr-1.7.0-6.oe2203.aarch64 1/2
        Running scriptlet: apr-1.7.0-6.oe2203.aarch64 1/2
        Running scriptlet: apr-1.7.0-4.oe2203.aarch64 2/2
        Cleanup : apr-1.7.0-4.oe2203.aarch64 2/2
        Running scriptlet: apr-1.7.0-4.oe2203.aarch64 2/2
        Verifying : apr-1.7.0-6.oe2203.aarch64 1/2
        Verifying : apr-1.7.0-4.oe2203.aarch64 2/2

        Upgraded:
        apr-1.7.0-6.oe2203.aarch64

        Complete!
        """
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_stdout, ""
        self.assertEqual((SUCCESS, mock_stdout), VulnerabilityManage()._update_coldpatch_by_dnf("apr"))

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_coldpatch_by_dnf_should_return_fail_when_execute_update_command_failed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", "error"
        self.assertEqual((FAIL, "error"), VulnerabilityManage()._update_coldpatch_by_dnf("apr"))

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_coldpatch_by_dnf_should_return_fail_when_execute_command_succeed_but_update_package_failed(
        self, mock_execute_shell_command
    ):
        mock_stdout = "Error: Failed to download metadata for repo 'OS': Cannot download \
            repomd.xml: Cannot download repodata/repomd.xml: All mirrors were tried"
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_stdout, ""
        self.assertEqual((FAIL, mock_stdout), VulnerabilityManage()._update_coldpatch_by_dnf("apr"))

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_hotpatch_by_dnf_plugin_should_return_succeed_when_takeover_is_true_and_command_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_stdout = """
        Last metadata expiration check: 1:05:28 ago on Wed 05 Jul 2023 11:34:33 AM CST.
        Dependencies resolved.
        ================================================================================
        Package Arch Version Repository Size
        ================================================================================
        Installing:
        patch-redis-6.2.5-1-HP002 x86_64 1-1 local_hotpatch_new 163 k

        Transaction Summary
        ================================================================================
        Install 1 Package

        Total size: 163 k
        Installed size: 822 k
        Downloading Packages:
        Running transaction check
        Transaction check succeeded.
        Running transaction test
        Transaction test succeeded.
        Running transaction
        Preparing : 1/1
        Installing : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Running scriptlet: patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Verifying : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1

        Installed:
        patch-redis-6.2.5-1-HP002-1-1.x86_64

        Complete!
        Applying hot patch
        Apply hot patch succeed: redis-6.2.5-1/HP002.
        """
        VulnerabilityManage.takeover = True
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_stdout, ""
        self.assertEqual(
            (SUCCESS, mock_stdout),
            VulnerabilityManage()._update_hotpatch_by_dnf_plugin("patch-redis-6.2.5-1-HP002-1-1.x86_64"),
        )

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_hotpatch_by_dnf_plugin_should_return_succeed_when_takeover_and_accepted_is_false_and_command_execute_succeed(
        self, mock_execute_shell_command
    ):
        mock_stdout = """
        Last metadata expiration check: 1:05:28 ago on Wed 05 Jul 2023 11:34:33 AM CST.
        Dependencies resolved.
        ================================================================================
        Package Arch Version Repository Size
        ================================================================================
        Installing:
        patch-redis-6.2.5-1-HP002 x86_64 1-1 local_hotpatch_new 163 k

        Transaction Summary
        ================================================================================
        Install 1 Package

        Total size: 163 k
        Installed size: 822 k
        Downloading Packages:
        Running transaction check
        Transaction check succeeded.
        Running transaction test
        Transaction test succeeded.
        Running transaction
        Preparing : 1/1
        Installing : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Running scriptlet: patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Verifying : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1

        Installed:
        patch-redis-6.2.5-1-HP002-1-1.x86_64

        Complete!
        Applying hot patch
        Apply hot patch succeed: redis-6.2.5-1/HP002.
        """
        VulnerabilityManage.takeover = False
        VulnerabilityManage.accepted = False
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_stdout, ""
        self.assertEqual(
            (SUCCESS, mock_stdout),
            VulnerabilityManage()._update_hotpatch_by_dnf_plugin("patch-redis-6.2.5-1-HP002-1-1.x86_64"),
        )

    @mock.patch.object(VulnerabilityManage, "_set_hotpatch_status_by_dnf_plugin")
    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_update_hotpatch_by_dnf_plugin_should_return_succeed_when_command_execute_succeed_and_takeover_is_false(
        self, mock_execute_shell_command, mock_change_patch_status
    ):
        """
        scene simulation: takeover is false,accepted is true, and patch status accepted set failed
        """
        mock_stdout = """
        Last metadata expiration check: 1:05:28 ago on Wed 05 Jul 2023 11:34:33 AM CST.
        Dependencies resolved.
        ================================================================================
        Package Arch Version Repository Size
        ================================================================================
        Installing:
        patch-redis-6.2.5-1-HP002 x86_64 1-1 local_hotpatch_new 163 k

        Transaction Summary
        ================================================================================
        Install 1 Package

        Total size: 163 k
        Installed size: 822 k
        Downloading Packages:
        Running transaction check
        Transaction check succeeded.
        Running transaction test
        Transaction test succeeded.
        Running transaction
        Preparing : 1/1
        Installing : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Running scriptlet: patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1
        Verifying : patch-redis-6.2.5-1-HP002-1-1.x86_64 1/1

        Installed:
        patch-redis-6.2.5-1-HP002-1-1.x86_64

        Complete!
        Applying hot patch
        Apply hot patch succeed: redis-6.2.5-1/HP002.
        """
        VulnerabilityManage.takeover = False
        VulnerabilityManage.accepted = True
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, mock_stdout, ""
        mock_change_patch_status.return_value = False, "patch status 'accepted' set failed"
        self.assertEqual(
            (SUCCESS, f"{mock_stdout}\npatch status 'accepted' set failed"),
            VulnerabilityManage()._update_hotpatch_by_dnf_plugin("patch-redis-6.2.5-1-HP002-1-1.x86_64"),
        )

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_set_hotpatch_status_by_dnf_plugin_should_return_true_when_all_patch_status_change_succeed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.SUCCEED, "", ""
        self.assertEqual(True, VulnerabilityManage._set_hotpatch_status_by_dnf_plugin("mock_patch", "accept")[0])

    @mock.patch('ceres.manages.vulnerability_manage.execute_shell_command')
    def test_set_hotpatch_status_by_dnf_plugin_should_return_false_set_patch_status_failed(
        self, mock_execute_shell_command
    ):
        mock_execute_shell_command.return_value = CommandExitCode.FAIL, "", ""
        self.assertEqual(False, VulnerabilityManage._set_hotpatch_status_by_dnf_plugin("mock_patch", "accept")[0])
